import os
import scipy
from scipy import signal
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pickle
from collections import deque
import glob
import imageio
imageio.plugins.ffmpeg.download()
from moviepy.editor import VideoFileClip
from IPython.display import HTML
%matplotlib inline
out_dir='./output_images/camera_calib/'
# prepare object points
nx = 9 #TODO: enter the number of inside corners in x
ny = 6 #TODO: enter the number of inside corners in y
# Arrays to store object points and image points from all the images.
objpoints = [] # 3d points in real world space
imgpoints = [] # 2d points in image plane.
# prepare object points, like (0,0,0), (1,0,0), (2,0,0) ....,(6,5,0)
objp = np.zeros((ny*nx,3), np.float32)
objp[:,:2] = np.mgrid[0:nx, 0:ny].T.reshape(-1,2)
plt.figure(figsize=(12,30))
notfindimg = cv2.imread('./output_images/cannot_find_corner.png')
for i in range(20):
plt.subplot(10,2,i+1)
fname = './camera_cal/calibration'+str(i+1)+'.jpg'
print(fname)
img = cv2.imread(fname)
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Find the chessboard corners
ret, corners = cv2.findChessboardCorners(gray, (nx, ny), None)
# If found, add object points, image points
savefname = 'corners_found'+str(i+1)+'.jpg'
if ret == True:
objpoints.append(objp)
imgpoints.append(corners)
# Draw and display the corners
cv2.drawChessboardCorners(img, (nx,ny), corners, ret)
write_name = out_dir+savefname
cv2.imwrite(write_name, img)
print(write_name)
plt.title(savefname,fontsize=10)
plt.axis('off')
plt.imshow(img)
else:
print('cannot find corner..')
plt.title(savefname,fontsize=10)
plt.axis('off')
plt.imshow(notfindimg)
# Test undistortion on an image
img = cv2.imread('camera_cal/calibration1.jpg')
img_size = (img.shape[1], img.shape[0])
# Do camera calibration given object points and image points
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(objpoints, imgpoints, img_size,None,None)
dst = cv2.undistort(img, mtx, dist, None, mtx)
f, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
ax1.imshow(img)
ax1.set_title('Original Image', fontsize=30)
ax2.imshow(dst)
ax2.set_title('Undistorted Image', fontsize=30)
# Save the camera calibration result for later use (we won't worry about rvecs / tvecs)
dist_pickle = {}
dist_pickle["mtx"] = mtx
dist_pickle["dist"] = dist
pickle.dump( dist_pickle, open( out_dir+"camera_dist_pickle.p", "wb" ) )
# Visualize undistortion
# Step through the list and search for chessboard corners
# load pickled distortion matrix
plt.figure(figsize=(12,30))
with open(out_dir+'camera_dist_pickle.p', mode='rb') as f:
dist_pickle = pickle.load(f)
mtx = dist_pickle["mtx"]
dist = dist_pickle["dist"]
# Visualize undistortion on test images
for i in range(20):
plt.subplot(10,2,i+1)
fname = './camera_cal/calibration'+str(i+1)+'.jpg'
print(fname)
img = cv2.imread(fname)
dst = cv2.undistort(img, mtx, dist, None, mtx)
savefname = 'undistortion'+str(i+1)+'.jpg'
write_name = out_dir+savefname
cv2.imwrite(write_name,dst)
plt.title(savefname,fontsize=10)
plt.axis('off')
plt.imshow(img)
def undistort(img):
result = cv2.undistort(img, mtx, dist, None, mtx)
return result
def binarize(img, s_thresh=(120, 255), sx_thresh=(20, 255),l_thresh=(40,255)):
img = np.copy(img)
# Convert to HLS color space and separate the V channel
hls = cv2.cvtColor(img, cv2.COLOR_RGB2HLS).astype(np.float)
#h_channel = hls[:,:,0]
l_channel = hls[:,:,1]
s_channel = hls[:,:,2]
# Sobel x
# sobelx = abs_sobel_thresh(img, orient='x', sobel_kernel=3, thresh=(0, 255))
# l_channel_col=np.dstack((l_channel,l_channel, l_channel))
sobelx = cv2.Sobel(l_channel, cv2.CV_64F, 1, 0) # Take the derivative in x
abs_sobelx = np.absolute(sobelx) # Absolute x derivative to accentuate lines away from horizontal
scaled_sobel = np.uint8(255*abs_sobelx/np.max(abs_sobelx))
# Threshold x gradient
sxbinary = np.zeros_like(scaled_sobel)
sxbinary[(scaled_sobel >= sx_thresh[0]) & (scaled_sobel <= sx_thresh[1])] = 1
# Threshold saturation channel
s_binary = np.zeros_like(s_channel)
s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 1
# Threshold lightness
l_binary = np.zeros_like(l_channel)
l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 1
channels = 255*np.dstack(( l_binary, sxbinary, s_binary)).astype('uint8')
binary = np.zeros_like(sxbinary)
binary[((l_binary == 1) & (s_binary == 1) | (sxbinary==1))] = 1
binary = 255*np.dstack((binary,binary,binary)).astype('uint8')
return binary,channels
img = plt.imread('test_images/test5.jpg')
shape = img.shape
binary,channels = binarize(img)
plt.imsave(out_dir+'S_binary.jpg',binary)
plt.imsave(out_dir+'HLS_channels.jpg',channels)
# Plot the result
f, (ax1,ax2) = plt.subplots(1, 2, figsize=(24, 9))
f.tight_layout()
ax1.imshow(binary)
ax1.set_title('S channel Binary of HLS Color', fontsize=40)
ax2.imshow(channels)
ax2.set_title('HLS_Channels', fontsize=40)
def warp(img,tobird=True):
corners = np.float32([[190,720],[589,457],[698,457],[1145,720]])
new_top_left=np.array([corners[0,0],0])
new_top_right=np.array([corners[3,0],0])
offset=[150,0]
img_size = (img.shape[1], img.shape[0])
src = np.float32([corners[0],corners[1],corners[2],corners[3]])
dst = np.float32([corners[0]+offset,new_top_left+offset,new_top_right-offset ,corners[3]-offset])
if tobird:
M = cv2.getPerspectiveTransform(src, dst)
else:
M = cv2.getPerspectiveTransform(dst,src)
warped = cv2.warpPerspective(img, M, img_size , flags=cv2.INTER_LINEAR)
return warped, M
img=plt.imread('./test_images/straight_lines1.jpg')
corners = np.float32([[190,720],[589,457],[698,457],[1145,720]])
img = cv2.undistort(img, mtx, dist, None, mtx)
imshape = img.shape
corner_tuples=[]
for ind,c in enumerate(corners):
corner_tuples.append(tuple(corners[ind]))
cv2.line(img, corner_tuples[0], corner_tuples[1], color=[255,0,0], thickness=1)
cv2.line(img, corner_tuples[1], corner_tuples[2], color=[255,0,0], thickness=1)
cv2.line(img, corner_tuples[2], corner_tuples[3], color=[255,0,0], thickness=1)
cv2.line(img, corner_tuples[3], corner_tuples[0], color=[255,0,0], thickness=1)
warped,_ = warp(img)
plt.imsave(out_dir+'lane_roi.jpg',img)
plt.imsave(out_dir+'lane_roi_warped.jpg',warped)
# Plot the result
f, (ax1,ax2) = plt.subplots(1, 2, figsize=(24, 9))
f.tight_layout()
ax1.imshow(img)
ax1.set_title('Original image and roi area(red)', fontsize=40)
ax2.imshow(warped)
ax2.set_title('Warped image and roi area(red)', fontsize=40)
def region_of_interest(img):
"""
Applies an image mask.
Only keeps the region of the image defined by the polygon
formed from `vertices`. The rest of the image is set to black.
"""
shape = img.shape
vertices = np.array([[(0,0),(shape[1],0),(shape[1],0),(6*shape[1]/7,shape[0]),
(shape[1]/7,shape[0]), (0,0)]],dtype=np.int32)
mask = np.zeros_like(img)
#defining a 3 channel or 1 channel color to fill the mask with depending on the input image
if len(img.shape) > 2:
channel_count = img.shape[2] # i.e. 3 or 4 depending on your image
ignore_mask_color = (255,) * channel_count
else:
ignore_mask_color = 255
#filling pixels inside the polygon defined by "vertices" with the fill color
cv2.fillPoly(mask, vertices, ignore_mask_color)
#returning the image only where mask pixels are nonzero
masked_image = cv2.bitwise_and(img, mask)
return masked_image
def warp_pipeline(img):
undist = undistort(img)
result,_ = warp(undist)
result = region_of_interest(result)
return result
def warp_binarize_pipeline(img):
undist = undistort(img)
binary,_ = binarize(undist)
result,_ = warp(binary)
result = region_of_interest(result)
return result
warp_roi = warp_pipeline(img)
warp_binary_roi = warp_binarize_pipeline(img)
plt.imsave(out_dir+'warp_roi.jpg',warp_roi)
plt.imsave(out_dir+'warp_binary_roi.jpg',warp_binary_roi)
# Plot the result
f, (ax1,ax2) = plt.subplots(1, 2, figsize=(24, 9))
f.tight_layout()
ax1.imshow(warp_roi)
ax1.set_title('Warped ROI', fontsize=40)
ax2.imshow(warp_binary_roi)
ax2.set_title('Warped binary ROI', fontsize=40)
# now back to the test image
img=plt.imread('./test_images/test5.jpg')
warped = warp_pipeline(img)
warped_binary = warp_binarize_pipeline(img)
plt.imsave(out_dir+'warp_test5.jpg',warped)
plt.imsave(out_dir+'warp_binary_test5.jpg',warped_binary)
# Plot the result
f, (ax1,ax2) = plt.subplots(1, 2, figsize=(24, 9))
f.tight_layout()
ax1.imshow(warped)
ax1.set_title('Warped ROI', fontsize=40)
ax2.imshow(warped_binary)
ax2.set_title('Warped binary ROI', fontsize=40)
def find_peaks(img,thresh):
half_ss = int(img.shape[0]/2)
#print(half_ss)
img_half=img[half_ss:,:,0]
data = np.sum(img_half, axis=0)
filtered = scipy.ndimage.filters.gaussian_filter1d(data,20)
xs = np.arange(len(filtered))
peak_ind = signal.find_peaks_cwt(filtered, np.arange(20,300))
peaks = np.array(peak_ind)
peaks = peaks[filtered[peak_ind]>thresh]
return peaks,filtered
def get_next_window(img,center_point,width):
"""
input: img,center_point,width
img: binary 3 channel image
center_point: center of window
width: width of window
output: masked,center_point
masked : a masked image of the same size. mask is a window centered at center_point
center : the mean ofall pixels found within the window
"""
ny,nx,_ = img.shape
mask = np.zeros_like(img)
if (center_point <= width/2): center_point = width/2
if (center_point >= nx-width/2): center_point = nx-width/2
left = center_point - width/2
right = center_point + width/2
vertices = np.array([[(left,0),(left,ny), (right,ny),(right,0)]], dtype=np.int32)
ignore_mask_color=(255,255,255)
cv2.fillPoly(mask, vertices, ignore_mask_color)
masked = cv2.bitwise_and(mask,img)
hist = np.sum(masked[:,:,0],axis=0)
if max(hist>10000):
center = np.argmax(hist)
else:
center = center_point
return masked,center
def lane_from_window(binary,center_point,width):
n_zones=6
ny,nx,nc = binary.shape
zones = binary.reshape(n_zones,-1,nx,nc)
zones = zones[::-1] # start from the bottom slice
window,center = get_next_window(zones[0],center_point,width)
for zone in zones[1:]:
next_window,center = get_next_window(zone,center,width)
window = np.vstack((next_window,window))
return window
left_binary = lane_from_window(warped_binary,380,300)
right_binary = lane_from_window(warped_binary,1000,300)
plt.imsave(out_dir+'left_lane.jpg',left_binary)
plt.imsave(out_dir+'right_lane.jpg',right_binary)
# Plot the result
f, (ax1,ax2) = plt.subplots(1, 2, figsize=(24, 9))
f.tight_layout()
ax1.imshow(left_binary)
ax1.set_title('Left Lane', fontsize=40)
ax2.imshow(right_binary)
ax2.set_title('Right Lane', fontsize=40)
# Define a class to receive the characteristics of each line detection
class Line:
def __init__(self,n=5):
# length of queue to store data
self.n = n
#number of fits in buffer
self.n_buffered = 0
# was the line detected in the last iteration?
self.detected = False
# x values of the last n fits of the line
self.recent_xfitted = deque([],maxlen=n)
#average x values of the fitted line over the last n iterations
self.avgx = None
# fit coeffs of the last n fits
self.recent_fit_coeffs = deque([],maxlen=n)
#polynomial coefficients averaged over the last n iterations
self.avg_fit_coeffs = None
# xvals of the most recent fit
self.current_fit_xvals = [np.array([False])]
#polynomial coefficients for the most recent fit
self.current_fit_coeffs = [np.array([False])]
#x values for detected line pixels
self.allx = None
#y values for detected line pixels
self.ally = None
#y values for line fit
self.fit_yvals = np.linspace(0, 100, num=101)*7.2 # always the same y-range as image
#radius of curvature of the line in some units
self.radius_of_curvature = None
# origin (pixels) of fitted line at the bottom of the image
self.line_pos = None
#distance in meters of vehicle center from the line
self.line_base_pos = None
#difference in fit coefficients between last and new fits
self.diffs = np.array([0,0,0], dtype='float')
def set_current_fit_xvals(self):
yvals = self.fit_yvals
self.current_fit_xvals = self.current_fit_coeffs[0]*yvals**2 + self.current_fit_coeffs[1]*yvals + self.current_fit_coeffs[2]
def add_data(self):
self.recent_xfitted.appendleft(self.current_fit_xvals)
self.recent_fit_coeffs.appendleft(self.current_fit_coeffs)
assert len(self.recent_xfitted)==len(self.recent_fit_coeffs)
self.n_buffered = len(self.recent_xfitted)
def pop_data(self):
if self.n_buffered>0:
self.recent_xfitted.pop()
self.recent_fit_coeffs.pop()
assert len(self.recent_xfitted)==len(self.recent_fit_coeffs)
self.n_buffered = len(self.recent_xfitted)
return self.n_buffered
def set_avgx(self):
fits = self.recent_xfitted
if len(fits)>0:
avg=0
for fit in fits:
avg +=np.array(fit)
avg = avg / len(fits)
self.avgx = avg
def set_avgcoeffs(self):
coeffs = self.recent_fit_coeffs
if len(coeffs)>0:
avg=0
for coeff in coeffs:
avg +=np.array(coeff)
avg = avg / len(coeffs)
self.avg_fit_coeffs = avg
def set_allxy(self,lane_candidate):
self.ally,self.allx = (lane_candidate[:,:,0]>254).nonzero()
def set_current_fit_coeffs(self):
self.current_fit_coeffs = np.polyfit(self.ally, self.allx, 2)
def get_diffs(self):
if self.n_buffered>0:
self.diffs = self.current_fit_coeffs - self.avg_fit_coeffs
else:
self.diffs = np.array([0,0,0], dtype='float')
def set_radius_of_curvature(self):
# Define y-value where we want radius of curvature (choose bottom of the image)
y_eval = max(self.fit_yvals)
if self.avg_fit_coeffs is not None:
self.radius_of_curvature = ((1 + (2*self.avg_fit_coeffs[0]*y_eval + self.avg_fit_coeffs[1])**2)**1.5) \
/np.absolute(2*self.avg_fit_coeffs[0])
def set_line_base_pos(self):
y_eval = max(self.fit_yvals)
self.line_pos = self.current_fit_coeffs[0]*y_eval**2 \
+self.current_fit_coeffs[1]*y_eval \
+ self.current_fit_coeffs[2]
basepos = 640
self.line_base_pos = (self.line_pos - basepos)*3.7/600.0 # 3.7 meters is about 600 pixels in the x direction
# here come sanity checks of the computed metrics
def accept_lane(self):
flag = True
maxdist = 2.8 # distance in meters from the lane
if(abs(self.line_base_pos) > maxdist ):
print('lane too far away')
flag = False
if(self.n_buffered > 0):
relative_delta = self.diffs / self.avg_fit_coeffs
# allow maximally this percentage of variation in the fit coefficients from frame to frame
if not (abs(relative_delta)<np.array([0.7,0.5,0.15])).all():
print('fit coeffs too far off [%]',relative_delta)
flag=False
return flag
def update(self,lane):
self.set_allxy(lane)
self.set_current_fit_coeffs()
self.set_current_fit_xvals()
self.set_radius_of_curvature()
self.set_line_base_pos()
self.get_diffs()
if self.accept_lane():
self.detected=True
self.add_data()
self.set_avgx()
self.set_avgcoeffs()
else:
self.detected=False
self.pop_data()
if self.n_buffered>0:
self.set_avgx()
self.set_avgcoeffs()
return self.detected,self.n_buffered
def get_binary_lane_image(img,line,window_center,width=300):
if line.detected:
window_center=line.line_pos
else:
peaks,filtered = find_peaks(img,thresh=3000)
if len(peaks)!=2:
print('Trouble ahead! '+ str(len(peaks)) +' lanes detected!')
plt.imsave('troublesome_image.jpg',img)
peak_ind = np.argmin(abs(peaks-window_center))
peak = peaks[peak_ind]
window_center = peak
lane_binary = lane_from_window(img,window_center,width)
return lane_binary
left=Line()
right=Line()
detected_l,n_buffered_left = left.update(left_binary)
detected_r,n_buffered_right = right.update(right_binary)
leftx = left.allx
left_fitx = left.current_fit_xvals
yvals_l = left.ally
rightx = right.allx
right_fitx = right.current_fit_xvals
yvals_r = right.ally
yvals = left.fit_yvals
plt.plot(rightx, yvals_r, '.', color='red')
plt.plot(right_fitx, yvals, color='green', linewidth=3)
plt.plot(leftx, yvals_l, '.', color='red')
plt.plot(left_fitx, yvals, color='green', linewidth=3)
plt.xlim(0, 1280)
plt.ylim(0, 720)
plt.gca().invert_yaxis()
plt.savefig(out_dir+'fitted_lanes.jpg')
plt.show()
def project_lane_lines(img,left_fitx,right_fitx,yvals):
# Create an image to draw the lines on
color_warp = np.zeros_like(img).astype(np.uint8)
# Recast the x and y points into usable format for cv2.fillPoly()
pts_left = np.array([np.transpose(np.vstack([left_fitx, yvals]))])
pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, yvals])))])
pts = np.hstack((pts_left, pts_right))
# Draw the lane onto the warped blank image
cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0))
undist = undistort(img)
unwarp,Minv = warp(img,tobird=False)
# Warp the blank back to original image space using inverse perspective matrix (Minv)
newwarp = cv2.warpPerspective(color_warp, Minv, (img.shape[1], img.shape[0]))
# Combine the result with the original image
result = cv2.addWeighted(undist, 1, newwarp, 0.3, 0)
return result
def process_image(img):
global left
global right
undist = undistort(img)
binary,_ = binarize(undist)
warped,_ = warp(binary)
warped_binary = region_of_interest(warped)
window_center_l = 340
if left.detected:
window_center_l = left.line_pos
left_binary = get_binary_lane_image(warped_binary,left,window_center_l,width=300)
window_center_r = 940
if right.detected:
window_center_r = right.line_pos
right_binary = get_binary_lane_image(warped_binary,right,window_center_r,width=300)
detected_l,n_buffered_left = left.update(left_binary)
detected_r,n_buffered_right = right.update(right_binary)
detected_l,n_buffered_left = left.update(left_binary)
detected_r,n_buffered_right = right.update(right_binary)
left_fitx = left.avgx
right_fitx = right.avgx
yvals = left.fit_yvals
lane_width = 3.7
off_center = -1*round(0.5*(right.line_base_pos-lane_width/2) + 0.5*(abs(left.line_base_pos)-lane_width/2),2)
result = project_lane_lines(img,left_fitx,right_fitx,yvals)
font = cv2.FONT_HERSHEY_SIMPLEX
str1 = str('Distance from center: '+str(off_center)+'m')
cv2.putText(result,str1,(430,630), font, 1,(0,0,255),2,cv2.LINE_AA)
cnt_int = 0
curvature = 0
if left.radius_of_curvature:
curvature = curvature + left.radius_of_curvature
cnt_int = cnt_int + 1
if right.radius_of_curvature:
curvature = curvature + right.radius_of_curvature
cnt_int = cnt_int + 1
if cnt_int > 0:
curvature = int(curvature / cnt_int)
str2 = str('Radius of curvature: '+str(curvature)+'m')
cv2.putText(result,str2,(430,670), font, 1,(0,0,255),2,cv2.LINE_AA)
return result
img = plt.imread('./test_images/test3.jpg')
#left = Line(7)
#right = Line(7)
left = Line()
right = Line()
result = process_image(img)
plt.figure()
plt.imshow(result)
plt.show()
plt.imsave(out_dir+'detected_lane_test3.jpg',result)
import glob
image_path = glob.glob('./test_images/*.jpg')
for image_names in image_path:
image = mpimg.imread(image_names)
print('This image path/name is:', image_names, 'with dimesions:', image.shape)
left = Line()
right = Line()
save_image_names = './output_images/test_results/detected_lane_'+image_names[14:-4]+'.jpg'
print('This save image path/name is:', save_image_names)
result = process_image(image)
plt.imshow(result)
plt.show()
mpimg.imsave(save_image_names, result)
out_dir='./output_images/test_results/'
output = out_dir+'detected_lane_project_video.mp4'
clip = VideoFileClip("project_video.mp4")
out_clip = clip.fl_image(process_image)
%time out_clip.write_videofile(output, audio=False)
HTML("""
<video width="960" height="540" controls>
<source src="{0}">
</video>
""".format(output))
output2 = out_dir+'detected_lane_challenge_video.mp4'
clip2 = VideoFileClip("challenge_video.mp4")
out_clip2 = clip2.fl_image(process_image)
%time out_clip2.write_videofile(output2, audio=False)
HTML("""
<video width="960" height="540" controls>
<source src="{0}">
</video>
""".format(output2))